/**
 * Medium length for events
 * 
 * @author cristina
 */

package org.PP;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class MediumEvents_straight {

	public static class Map extends MapReduceBase implements
			Mapper<LongWritable, Text, Text, DoubleWritable> {
		
		public void map(LongWritable key, Text value,
				OutputCollector<Text, DoubleWritable> output, Reporter reporter)
				throws IOException {

			String line = value.toString();
			// skip comments
			if (!line.startsWith("#")) {
				// collons 7 and 8 are start, end time of job respectively
				StringTokenizer tokenizer = new StringTokenizer(line);
				int count = 0;
				Double start = new Double(0), end = new Double(0), duration = new Double(
						0);
				while (tokenizer.hasMoreTokens()) {
					count++;
					String token = tokenizer.nextToken();
					if (count == 7)
						start = Double.parseDouble(token);
					if (count == 8) {
						end = Double.parseDouble(token);
						// stop, we have all the information
						break;
					}
				}

				duration = end - start;
				output.collect(new Text("medium duration"), new DoubleWritable(duration));
			}
		}
	}

	public static class Reduce extends MapReduceBase implements
	Reducer<Text, DoubleWritable, Text, DoubleWritable> {

	public void reduce(Text key, Iterator<DoubleWritable> values,
			OutputCollector<Text, DoubleWritable> output, Reporter reporter)
			throws IOException {
		Double sum = new Double(0);
		Long count = new Long(0);
		Double med = new Double(0);
	
		while (values.hasNext()) {
			count++;
			sum += values.next().get();
		}
		if (count > 0)
			med = sum / count;
		else
			med = new Double(0);
		output.collect(key, new DoubleWritable(med));
	
	}
}
	
	

	public static void main(String[] args) throws Exception {
		JobConf conf = new JobConf(MediumEvents_straight.class);
		conf.setJobName("MediumEvents_straight");

		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(DoubleWritable.class);

		conf.setMapperClass(Map.class);
		//conf.setCombinerClass(Reduce.class); !! there shouldn't be any reducer as it approximates!!
		conf.setReducerClass(Reduce.class);

		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);

		FileInputFormat.setInputPaths(conf, new Path(args[0]));
		FileOutputFormat.setOutputPath(conf, new Path(args[1]));

		conf.setNumReduceTasks(5);

		JobClient.runJob(conf);
	}
}
